package me.zhengjie.modules.business.websocket;

import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import me.zhengjie.exception.BadRequestException;
import me.zhengjie.modules.business.util.ChatRedisUtil;
import me.zhengjie.modules.business.util.ChatTypeVerificationUtil;
import me.zhengjie.modules.business.vo.ChatVO;
import me.zhengjie.utils.SpringContextHolder;
import me.zhengjie.utils.StringUtils;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 功能描述：自定义websocket服务
 *
 * @author RenShiWei
 * Date: 2020/2/6 14:20
 **/
@ServerEndpoint("/ws/{sid}")
@Slf4j
@Component
public class ChatWebSocketServer{

    /**
     *  线程池
     */
//    private final static ThreadPoolExecutor executor = ThreadPoolExecutorUtil.getPoll();

    /**
     * 静态变量，用来记录当前在线连接数。应该把它设计成线程安全的。
     */
    private static int onlineCount = 0;
    /**
     * concurrent包的线程安全Set，用来存放每个客户端对应的MyWebSocket对象。
     */
    private static ConcurrentHashMap<String, ChatWebSocketServer> webSocketMap = new ConcurrentHashMap<>();
    /**
     * 与某个客户端的连接会话，需要通过它来给客户端发送数据
     */
    private Session session;
    /**
     * 接收userId
     */
    private String userId = "";

    public ChatWebSocketServer () {

    }

    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen ( Session session, @PathParam("sid") String userId ) {
        this.session = session;
        this.userId = userId;
        if (webSocketMap.containsKey(userId)) {
            webSocketMap.remove(userId);
            webSocketMap.put(userId, this);
            //加入set中
        } else {
            webSocketMap.put(userId, this);
            //加入set中
            addOnlineCount();
            //在线数加1
        }
        log.info("用户连接:" + userId + ",当前在线人数为:" + getOnlineCount());
//        try {
//            sendMessage("系统消息：连接成功");
//        } catch (IOException e) {
//            log.error("用户:" + userId + ",网络异常!!!!!!");
//        }

    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose () {
        if (webSocketMap.containsKey(userId)) {
            webSocketMap.remove(userId);
            //从set中删除
            subOnlineCount();
        }
        log.info("用户退出:" + userId + ",当前在线人数为:" + getOnlineCount());
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息
     */
    @OnMessage
    public void onMessage ( String message, Session session ) {
        log.info("用户消息:" + userId + ",报文:" + message);
        //给指定用户发送消息
//        executor.execute(new Runnable() {
//            @Override
//            public void run() {
                sendInfoByJson(message);
//            }
//        });
    }

    @OnError
    public void onError ( Session session, Throwable error ) {
        log.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
        error.printStackTrace();
    }

    /**
     * 实现服务器主动推送
     */
    private void sendMessage ( String message ) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }

    /**
     * 功能描述：获取websocket的连接数
     *
     * @return websocket的连接数
     * @author RenShiWei
     * Date: 2020/2/6 14:37
     */
    public static synchronized int getOnlineCount () {
        return onlineCount;
    }

    /**
     * 功能描述：websocket的连接数+1
     *
     * @author RenShiWei
     * Date: 2020/2/6 14:39
     */
    public static synchronized void addOnlineCount () {
        ChatWebSocketServer.onlineCount++;
    }

    /**
     * 功能描述：websocket的连接数-1
     *
     * @author RenShiWei
     * Date: 2020/2/6 14:39
     */
    public static synchronized void subOnlineCount () {
        ChatWebSocketServer.onlineCount--;
    }

    /**
     * 功能描述：给所有连接websocket的用户群发消息
     * xxx：应该是给平台所有用户发送消息（后期需要修改）
     *
     * @param message 发送的消息内容
     * @author RenShiWei
     * Date: 2020/2/6 14:40
     */
    public static void sendInfoToAllUser ( String message ) {
        if (StringUtils.isNotBlank(message)) {
            try {
                //给所有连接的webSocket发送消息————应该是给所有客户端发送（后期再说）
                for (ChatWebSocketServer ws : webSocketMap.values()) {
                    ws.sendMessage(message);
                    log.info("推送消息到" + ws.userId + "，推送内容:" + message);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 发送自定义消息（单发消息、消息无格式处理）
     *
     * @param message 发送的消息内容
     * @param userId  接收方id
     * @author RenShiWei
     * Date: 2020/2/6 14:40
     */
    public static void sendInfoToSingleUser ( String message, @PathParam("userId") String userId ) throws IOException {
        log.info("发送消息到:" + userId + "，报文:" + message);
        if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) {
            webSocketMap.get(userId).sendMessage(message);
        } else {
            log.error("用户" + userId + ",不在线！");
        }
    }

    /**
     * 功能描述：将json格式的消息，发送给指定用户
     *
     * @param message json信息 对应ChatVO实体
     * @author RenShiWei
     * Date: 2020/2/6 21:39
     */
    public void sendInfoByJson ( String message ) {
        if (StringUtils.isNotBlank(message)) {
            try {
                ChatVO chatVO = JSONUtil.toBean(message, ChatVO.class, true);
                //判断消息类型是否符合预定的规范
                if (ChatTypeVerificationUtil.verifyChatType(chatVO.getChatMessageType())) {
                    //追加发送人(防止串改)
                    chatVO.setFromUserId(Integer.parseInt(this.userId));
                    //传送给对应toUserId用户的websocket
                    String toUserId = chatVO.getToUserId().toString();
                    if (StringUtils.isNotBlank(toUserId) && webSocketMap.containsKey(toUserId)) {
                        //心跳包消息
                        if(chatVO.getChatMessageType().equals(ChatTypeVerificationUtil.ChatMessageTypeEnum.HEART.toString().toLowerCase())){
                            System.out.println(chatVO);
                            webSocketMap.get(toUserId).sendMessage(JSONUtil.toJsonStr(chatVO));
                            System.out.println("userId:"+toUserId+" 发送心跳包成功");
                            return ;
                        }
                        //发送消息
                        webSocketMap.get(toUserId).sendMessage(JSONUtil.toJsonStr(chatVO));
                        //消息保存到redis   采用框架自身使用的redis的方式，还不理解
                        ChatRedisUtil chatRedisUtil = SpringContextHolder.getBean(ChatRedisUtil.class);
                        //拼接发送方和接收方，成为房间号
                        String key = chatRedisUtil.createChatNumber(chatVO.getQuestionId(),chatVO.getFromUserId(), chatVO.getToUserId());
                        //聊天记录存储进redis
                        boolean isCache = chatRedisUtil.saveCacheChatMessage(key, chatVO);
                        //缓存失败，抛出异常
                        if (!isCache) {
                            throw new BadRequestException("聊天内容缓存失败,聊天信息内容：" + message);
                        }
                    } else {
                        log.error("请求的userId:" + toUserId + "不在该服务器上");
                        //否则不在这个服务器上，发送到mysql或者redis
                    }
                } else {
                    throw new BadRequestException("发送的消息类型暂不支持");
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
